package org.jugile.util;

import java.util.ArrayList;
import java.util.List;

import org.apache.log4j.Logger;
import org.jugile.util.Buffer;
import org.jugile.util.DBConnection;
import org.jugile.util.DBPool;
import org.jugile.util.Jugile;
import org.jugile.util.Time;

public class DBQueue extends Jugile {
	static Logger log = Logger.getLogger(DBQueue.class);

	public final static long NEW = 0L;
	public final static long INPROGRESS = 1L;
	public final static long DONE = 2L;
	public final static long ERROR = 3L;

	public static void writeMessage(String msg, String node, String nodes[]) {
		if (nodes == null) return;
		DBPool pool = DBPool.getPool();
		DBConnection c = pool.getConnection();
		try {
			writeMessage(msg,node,nodes,c);
			c.commit();
		} catch (java.sql.SQLNonTransientConnectionException se) {
			log.info("connection error in write. retrying in 30 sec: " + node);
			// sleep 30 sec and retry
			try {
				Thread.sleep(30000);
			} catch (InterruptedException ie) {
				fail(ie);
			}
			log.info("retrying write now: " + node);
			writeMessage(msg,node,nodes);
		} catch (Exception e) {
			try {
				c.rollback();
			} catch (Exception e2) {
				fail(e2);
			}
			fail(e);
		} finally {
			try { c.free();
			} catch (Exception e3) {
				//fail(e3);
			}
		}
	}

	public static void writeMessage(String msg, String node, String nodes[], DBConnection c) throws Exception {
		if (nodes == null) return;
		String sql ="";
		sql += "insert into dbmq_messages_t ";
		sql += "set nodeid=?, msg=?";
		c.prepare(sql);
		c.param(node);
		c.param(msg);
		c.execute();

		List<List> res = c.select("select LAST_INSERT_ID()");
		long lastid = (Long)res.get(0).get(0);
		//print("lastid: " + lastid);

		// put message link in to all listener queues
		for (String n : nodes) {
			if (n.equals(node)) continue; // other nodes except self
			sql = "insert into dbmq_queue_t ";
			sql += "set msg_id=?, nodeid=?, status=?";
			c.prepare(sql);
			c.param(lastid);
			c.param(n);
			c.param(NEW);
			c.execute();
		}			
	}

	public static List<Msg> getMessages(String node, int max) {
		DBPool pool = DBPool.getPool();
		DBConnection c = pool.getConnection();
		List<Msg> msgs = new ArrayList<Msg>();
		try {
			String sql ="";
			sql += "select msg_id, nodeid, status, ts from dbmq_queue_t ";
			sql += "where nodeid=? AND status=? ORDER BY ts LIMIT ?";
			c.prepare(sql);
			c.param(node);
			c.param(NEW);
			c.param(max);
			List<Long> ids = new ArrayList<Long>();
			for (List row : c.select()) {
				long msg_id = (Integer)row.get(0);
				ids.add(msg_id);
			}
			if (ids.size() == 0) return msgs;
			
			// get messages
			sql = "select msg, nodeid, id_f, ts from dbmq_messages_t ";
			sql += "where id_f IN ("+qmarks(ids.size())+")";
			c.prepare(sql);
			for (Long id : ids) { c.param(id); }
			for (List<Object> row : c.select()) {
				String msg = (String)row.get(0);
				String nodeid = (String)row.get(1);
				long id = (Integer)row.get(2);
				Time ts = new Time((java.util.Date)row.get(3));
				Msg m = new Msg(msg);
				m.id = id;
				m.ts = ts;
				m.nodeid = nodeid;
				msgs.add(m);
			}
			c.rollback();
			
			// mark all status in progress			
			sql = "update dbmq_queue_t set status=? ";
			sql += "where nodeid=? AND status=? AND msg_id IN ("+qmarks(ids.size())+")";
			c.prepare(sql);
			c.param(INPROGRESS);
			c.param(node);
			c.param(NEW);
			for (Long id : ids) { c.param(id); }
			int count = c.execute();
			if (count != ids.size()) fail("inconsistent status write: " + count + "/"+ ids.size());
			c.commit();
			log.debug("read messages from :" + node + " size: " + count);
			return msgs;
		} catch (java.sql.SQLNonTransientConnectionException se) {
			log.info("connection error in read. retrying in 30 sec: " + node);
			// sleep 30 sec and retry
			try {
				Thread.sleep(30000);
			} catch (InterruptedException ie) {
				fail(ie);
			}
			log.info("retrying read now: " + node);
			return getMessages(node,max);
		} catch (Exception e) {
			try {
				c.rollback();
			} catch (Exception e2) {
				fail(e2);
			}
			fail(e);
		} finally {
			try {
				c.free();
			} catch (Exception e3) {
				//fail(e3);
			}
		}
		return msgs;
	}
	
	public static int flush(String name) {
		DBPool pool = DBPool.getPool();
		DBConnection c = pool.getConnection();
		try {
			String sql = "delete from dbmq_queue_t where nodeid=?";
			c.prepare(sql);
			c.param(name);
			int res = c.execute();
			c.commit();
			return res;
		} catch (java.sql.SQLNonTransientConnectionException se) {
			fail("connection error in flush: " + name);
		} catch (Exception e) {
			try { c.rollback();	} catch (Exception e2) { fail(e2);}
			fail(e);
		} finally {
			try { c.free(); } catch (Exception e3) { }
		}
		return 0;
	}

	public static int flushNotNew(String name) {
		DBPool pool = DBPool.getPool();
		DBConnection c = pool.getConnection();
		try {
			String sql = "delete from dbmq_queue_t where nodeid=? and status!=0";
			c.prepare(sql);
			c.param(name);
			int res = c.execute();
			c.commit();
			return res;
		} catch (java.sql.SQLNonTransientConnectionException se) {
			fail("connection error in flush: " + name);
		} catch (Exception e) {
			try { c.rollback();	} catch (Exception e2) { fail(e2);}
			fail(e);
		} finally {
			try { c.free(); } catch (Exception e3) { }
		}
		return 0;
	}
	
	private static String qmarks(int count) {
		Buffer buf = new Buffer();
		for (int i = 0; i < count; i++) {
			if (i > 0) buf.add(",");
			buf.add("?");
		}
		return buf.toString();
	}
}
